package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import scala.Tuple2;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;

        /*Created by IntelliJ IDEA.
        User: rql
        Date: 2019/1/2
        Time: 14:41
        To change this template use File | Settings | File Templates.*/


public class StructuredStreamingWindow {
    /*
     * 从kafka内读取数据，并把数据分割，
     * 窗口大小1分钟，计算间隔30秒，
     * 实现单词的统计*/



        public static void main(String[] args) throws Exception {

            SparkSession spark = SparkSession.builder()
                    .appName("StructuredStreamingWindow")
                    .master("local[4]")
                    .getOrCreate();

            //从kafka里面读取
            Dataset<Row> lines = spark
                    .readStream()
                    .format("kafka")
                    .option("kafka.bootstrap.servers",ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS))
                    .option("subscribe", ConfigurationManager.getProperty(Constants.KAFKA_TOPICS))
                    .option("startingOffsets", "earliest")
                    .option("includeTimestamp", true)
                    .load();


//.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split("[|]")).iterator(), Encoders.STRING());
            // Split the lines into words, retaining timestamps

            Dataset<Row> words = lines
                    .selectExpr("CAST(value AS STRING)","CAST(timestamp AS TIMESTAMP)")
                    .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
                    .flatMap((FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>) t -> {
                                List<Tuple2<String, Timestamp>> result = new ArrayList<>();
                                for (String word : t._1.split("\\"+Constants.SEPARATOR_VERTICAL)) {
                                    result.add(new Tuple2<>(word, t._2));
                                }
                                return result.iterator();
                            },
                            Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
                    ).toDF("word","timestamp");

            words.printSchema();

            String windowDuration = ConfigurationManager.getProperty(Constants.WINDOW_DURATION) + " seconds";
            String slideDuration = ConfigurationManager.getProperty(Constants.SLIDE_DURATION) + " seconds";

            // Group the data by window and word and compute the count of each group
            Dataset<Row> windowedCounts = words.groupBy(
                    functions.window(words.col("timestamp"),windowDuration,slideDuration ),
                    words.col("word")
            ).count().orderBy("window");

            // Start running the query that prints the windowed word counts to the console
            StreamingQuery query = windowedCounts.writeStream()
                    .outputMode("complete")
                    .format("console")
                    .option("truncate", "false")
                    .start();

            query.awaitTermination();
        }
    }



